/*
 * Copyright (C) 2012 Clearspring Technologies, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.geode.internal.hll;

import org.apache.geode.redis.internal.executor.hll.HllExecutor;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.io.Serializable;

/**
 * Java implementation of HyperLogLog (HLL) algorithm from this paper:
 * <p/>
 * http://algo.inria.fr/flajolet/Publications/FlFuGaMe07.pdf
 * <p/>
 * HLL is an improved version of LogLog that is capable of estimating
 * the cardinality of a set with accuracy = 1.04/sqrt(m) where
 * m = 2^b.  So we can control accuracy vs space usage by increasing
 * or decreasing b.
 * <p/>
 * The main benefit of using HLL over LL is that it only requires 64%
 * of the space that LL does to get the same accuracy.
 * <p/>
 * This implementation implements a single counter.  If a large (millions)
 * number of counters are required you may want to refer to:
 * <p/>
 * http://dsiutils.dsi.unimi.it/
 * <p/>
 * It has a more complex implementation of HLL that supports multiple counters
 * in a single object, drastically reducing the java overhead from creating
 * a large number of objects.
 * <p/>
 * This implementation leveraged a javascript implementation that Yammer has
 * been working on:
 * <p/>
 * https://github.com/yammer/probablyjs
 * <p>
 * Note that this implementation does not include the long range correction function
 * defined in the original paper.  Empirical evidence shows that the correction
 * function causes more harm than good.
 * </p>
 * <p/>
 * <p>
 * Users have different motivations to use different types of hashing functions.
 * Rather than try to keep up with all available hash functions and to remove
 * the concern of causing future binary incompatibilities this class allows clients
 * to offer the value in hashed int or long form.  This way clients are free
 * to change their hash function on their own time line.  We recommend using Google's
 * Guava Murmur3_128 implementation as it provides good performance and speed when
 * high precision is required.  In our tests the 32bit MurmurHash function included
 * in this project is faster and produces better results than the 32 bit murmur3
 * implementation google provides.
 * </p>
 */
public class HyperLogLog implements ICardinality, Serializable {

  private static final long serialVersionUID = -4661220245111112301L;
  private final RegisterSet registerSet;
  private final int log2m;
  private final double alphaMM;


  /**
   * Create a new HyperLogLog instance using the specified standard deviation.
   *
   * @param rsd - the relative standard deviation for the counter.
   *            smaller values create counters that require more space.
   */
  public HyperLogLog(double rsd) {
    this(log2m(rsd));
  }

  private static int log2m(double rsd) {
    return (int) (Math.log((1.106 / rsd) * (1.106 / rsd)) / Math.log(2));
  }

  /**
   * Create a new HyperLogLog instance.  The log2m parameter defines the accuracy of
   * the counter.  The larger the log2m the better the accuracy.
   * <p/>
   * accuracy = 1.04/sqrt(2^log2m)
   *
   * @param log2m - the number of bits to use as the basis for the HLL instance
   */
  public HyperLogLog(int log2m) {
    this(log2m, new RegisterSet(1 << log2m));
  }

  /**
   * Creates a new HyperLogLog instance using the given registers.  Used for unmarshalling a serialized
   * instance and for merging multiple counters together.
   *
   * @param registerSet - the initial values for the register set
   */
  @Deprecated
  public HyperLogLog(int log2m, RegisterSet registerSet) {
    if (log2m < 0 || log2m > 30) {
      throw new IllegalArgumentException("log2m argument is "
          + log2m + " and is outside the range [0, 30]");
    }
    this.registerSet = registerSet;
    this.log2m = log2m;
    int m = 1 << this.log2m;

    alphaMM = getAlphaMM(log2m, m);
  }

  @Override
  public boolean offerHashed(long hashedValue) {
    // j becomes the binary address determined by the first b log2m of x
    // j will be between 0 and 2^log2m
    final int j = (int) (hashedValue >>> (Long.SIZE - log2m));
    final int r = Long.numberOfLeadingZeros((hashedValue << this.log2m) | (1 << (this.log2m - 1)) + 1) + 1;
    return registerSet.updateIfGreater(j, r);
  }

  @Override
  public boolean offerHashed(int hashedValue) {
    // j becomes the binary address determined by the first b log2m of x
    // j will be between 0 and 2^log2m
    final int j = hashedValue >>> (Integer.SIZE - log2m);
    final int r = Integer.numberOfLeadingZeros((hashedValue << this.log2m) | (1 << (this.log2m - 1)) + 1) + 1;
    return registerSet.updateIfGreater(j, r);
  }

  @Override
  public boolean offer(Object o) {
    final int x = MurmurHash.hash(o);
    return offerHashed(x);
  }


  @Override
  public long cardinality() {
    double registerSum = 0;
    int count = registerSet.count;
    double zeros = 0.0;
    for (int j = 0; j < registerSet.count; j++) {
      int val = registerSet.get(j);
      registerSum += 1.0 / (1 << val);
      if (val == 0) {
        zeros++;
      }
    }

    double estimate = alphaMM * (1 / registerSum);

    if (estimate <= (5.0 / 2.0) * count) {
      // Small Range Estimate
      return Math.round(linearCounting(count, zeros));
    } else {
      return Math.round(estimate);
    }
  }

  @Override
  public int sizeof() {
    return registerSet.size * 4;
  }

  @Override
  public byte[] getBytes() throws IOException {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    DataOutput dos = new DataOutputStream(baos);
    writeBytes(dos);

    return baos.toByteArray();
  }

  private void writeBytes(DataOutput serializedByteStream) throws IOException {
    serializedByteStream.writeInt(log2m);
    serializedByteStream.writeInt(registerSet.size * 4);
    for (int x : registerSet.readOnlyBits()) {
      serializedByteStream.writeInt(x);
    }
  }

  /**
   * Add all the elements of the other set to this set.
   * <p/>
   * This operation does not imply a loss of precision.
   *
   * @param other A compatible Hyperloglog instance (same log2m)
   * @throws CardinalityMergeException if other is not compatible
   */
  public void addAll(HyperLogLog other) throws CardinalityMergeException {
    if (this.sizeof() != other.sizeof()) {
      throw new HyperLogLogMergeException("Cannot merge estimators of different sizes");
    }

    registerSet.merge(other.registerSet);
  }

  @Override
  public ICardinality merge(ICardinality... estimators) throws CardinalityMergeException {
    HyperLogLog merged = new HyperLogLog(HllExecutor.DEFAULT_HLL_STD_DEV);//new HyperLogLog(log2m, new RegisterSet(this.registerSet.count));
    merged.addAll(this);

    if (estimators == null) {
      return merged;
    }

    for (ICardinality estimator : estimators) {
      if (!(estimator instanceof HyperLogLog)) {
        throw new HyperLogLogMergeException("Cannot merge estimators of different class");
      }
      HyperLogLog hll = (HyperLogLog) estimator;
      merged.addAll(hll);
    }

    return merged;
  }

  private Object writeReplace() {
    return new SerializationHolder(this);
  }

  /**
   * This class exists to support Externalizable semantics for
   * HyperLogLog objects without having to expose a public
   * constructor, public write/read methods, or pretend final
   * fields aren't final.
   *
   * In short, Externalizable allows you to skip some of the more
   * verbose meta-data default Serializable gets you, but still
   * includes the class name. In that sense, there is some cost
   * to this holder object because it has a longer class name. I
   * imagine people who care about optimizing for that have their
   * own work-around for long class names in general, or just use
   * a custom serialization framework. Therefore we make no attempt
   * to optimize that here (eg. by raising this from an inner class
   * and giving it an unhelpful name).
   */
  private static class SerializationHolder implements Externalizable {

    HyperLogLog hyperLogLogHolder;

    public SerializationHolder(HyperLogLog hyperLogLogHolder) {
      this.hyperLogLogHolder = hyperLogLogHolder;
    }

    /**
     * required for Externalizable
     */
    @SuppressWarnings("unused")
    public SerializationHolder() {

    }

    @Override
    public void writeExternal(ObjectOutput out) throws IOException {
      hyperLogLogHolder.writeBytes(out);
    }

    @Override
    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
      hyperLogLogHolder = Builder.build(in);
    }

    private Object readResolve() {
      return hyperLogLogHolder;
    }
  }

  public static class Builder implements IBuilder<ICardinality>, Serializable {

    private static final long serialVersionUID = -979314356097156719L;
    private double rsd;

    public Builder(double rsd) {
      this.rsd = rsd;
    }

    @Override
    public HyperLogLog build() {
      return new HyperLogLog(rsd);
    }

    @Override
    public int sizeof() {
      int log2m = log2m(rsd);
      int k = 1 << log2m;
      return RegisterSet.getBits(k) * 4;
    }

    public static HyperLogLog build(byte[] bytes) throws IOException {
      ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
      return build(new DataInputStream(bais));
    }

    public static HyperLogLog build(DataInput serializedByteStream) throws IOException {
      int log2m = serializedByteStream.readInt();
      int byteArraySize = serializedByteStream.readInt();
      return new HyperLogLog(log2m,
          new RegisterSet(1 << log2m, Bits.getBits(serializedByteStream, byteArraySize)));
    }
  }

  @SuppressWarnings("serial")
  protected static class HyperLogLogMergeException extends CardinalityMergeException {

    public HyperLogLogMergeException(String message) {
      super(message);
    }
  }

  protected static double getAlphaMM(final int p, final int m) {
    // See the paper.
    switch (p) {
    case 4:
      return 0.673 * m * m;
    case 5:
      return 0.697 * m * m;
    case 6:
      return 0.709 * m * m;
    default:
      return (0.7213 / (1 + 1.079 / m)) * m * m;
    }
  }

  protected static double linearCounting(int m, double V) {
    return m * Math.log(m / V);
  }
}
